package com.dahuan.transform;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_Filter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //从文件读取数据
        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        DataStreamSource<String> stringDataStreamSource = env.readTextFile( path );

        //筛选sensor_1开头的id对应的数据
        stringDataStreamSource.filter( new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                //包含sensor_10
                return value.startsWith( "sensor_1" );
            }
        } ).print();

        env.execute( "Transform_Filter" );
    }
}
